{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img width=\"200\" src=\"https://mmlspark.blob.core.windows.net/graphics/emails/vw-blue-dark-orange.svg\" />\n",
    "\n",
    "# Binary Classification with VowpalWabbit on Criteo Dataset \n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## SparkML Vector input"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark.sql.types as T\n",
    "from pyspark.sql import functions as F\n",
    "\n",
    "schema = T.StructType(\n",
    "    [\n",
    "        T.StructField(\"label\", T.IntegerType(), True),\n",
    "        *[T.StructField(\"i\" + str(i), T.IntegerType(), True) for i in range(1, 13)],\n",
    "        *[T.StructField(\"s\" + str(i), T.StringType(), True) for i in range(26)],\n",
    "    ]\n",
    ")\n",
    "\n",
    "df = (\n",
    "    spark.read.format(\"csv\")\n",
    "    .option(\"header\", False)\n",
    "    .option(\"delimiter\", \"\\t\")\n",
    "    .schema(schema)\n",
    "    .load(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/criteo_day0_1k.csv.gz\")\n",
    ")\n",
    "# print dataset basic info\n",
    "print(\"records read: \" + str(df.count()))\n",
    "print(\"Schema: \")\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Reformat into VW-native format\n",
    "See VW [docs](https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Input-format) for format details"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create VW string format\n",
    "cols = [\n",
    "    F.col(\"label\"),\n",
    "    F.lit(\"|\"),\n",
    "    *[F.col(\"i\" + str(i)) for i in range(1, 13)],\n",
    "    *[F.col(\"s\" + str(i)) for i in range(26)],\n",
    "]\n",
    "\n",
    "df = df.select(F.concat_ws(\" \", *cols).alias(\"value\"))\n",
    "display(df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Split the dataset into train and test"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train, test = df.randomSplit([0.6, 0.4], seed=1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Model Training"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.vw import VowpalWabbitGeneric\n",
    "\n",
    "# number of partitions determines data parallelism\n",
    "train = train.repartition(2)\n",
    "\n",
    "model = VowpalWabbitGeneric(\n",
    "    numPasses=5,\n",
    "    useBarrierExecutionMode=False,\n",
    "    passThroughArgs=\"--holdout_off --loss_function logistic --link logistic\",\n",
    ").fit(train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Model Prediction"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "predictions = model.transform(test)\n",
    "\n",
    "predictions = predictions.withColumn(\n",
    "    \"prediction\", F.col(\"prediction\").cast(\"double\")\n",
    ").withColumn(\"label\", F.substring(\"value\", 0, 1).cast(\"double\"))\n",
    "\n",
    "display(predictions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.train import ComputeModelStatistics\n",
    "\n",
    "metrics = ComputeModelStatistics(\n",
    "    evaluationMetric=\"classification\", labelCol=\"label\", scoredLabelsCol=\"prediction\"\n",
    ").transform(predictions)\n",
    "display(metrics)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
